directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From seelm...@apache.org
Subject svn commit: r902620 [3/6] - in /directory/sandbox/seelmann/hbase-partition: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/directory/ src/main/java/org/apache/directory/server/ src/main/java/org/a...
Date Sun, 24 Jan 2010 19:04:39 GMT
Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/RowCounterMapper.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/RowCounterMapper.java?rev=902620&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/RowCounterMapper.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/RowCounterMapper.java Sun Jan 24 19:04:37 2010
@@ -0,0 +1,66 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.mapreduce;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+
+
+public class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result>
+{
+    /** Counter enumeration to count the actual rows. */
+    private static enum Counters
+    {
+        ROWS
+    }
+
+
+    /**
+     * Maps the data.
+     * 
+     * @param row  The current table row key.
+     * @param values  The columns.
+     * @param context  The current context.
+     * @throws IOException When something is broken with the data.
+     * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, 
+     *   org.apache.hadoop.mapreduce.Mapper.Context)
+     */
+    @Override
+    public void map( ImmutableBytesWritable row, Result values, Context context ) throws IOException
+    {
+        //System.out.println(values.getFamilyMap( arg0 ));
+        new Exception().printStackTrace();
+        for ( KeyValue value : values.list() )
+        {
+            if ( value.getValue().length > 0 )
+            {
+                context.getCounter( Counters.ROWS ).increment( 1 );
+                System.out.println( "current count: " + context.getCounter( Counters.ROWS ).getValue() );
+                break;
+            }
+        }
+    }
+
+}

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/SimpleMapper.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/SimpleMapper.java?rev=902620&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/SimpleMapper.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/SimpleMapper.java Sun Jan 24 19:04:37 2010
@@ -0,0 +1,38 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.mapreduce;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+
+public class SimpleMapper extends Mapper<Text, Text, Text, Text>
+{
+
+    @Override
+    public void map( Text key, Text value, Context context ) throws IOException
+    {
+        //System.out.println(values.getFamilyMap( arg0 ));
+    }
+
+}

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java?rev=902620&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java Sun Jan 24 19:04:37 2010
@@ -0,0 +1,232 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.table;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+
+import org.apache.directory.server.core.partition.hbase.Cache;
+import org.apache.directory.shared.ldap.schema.SchemaManager;
+import org.apache.directory.shared.ldap.util.Base64;
+import org.apache.directory.shared.ldap.util.ByteBuffer;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * HBase specific implementation of an user index table that only uses one
+ * row per value and stores all candiates within the treeInfo column family.
+ * This implementation is intended to be used for values with only one or few
+ * candidates, an example is the uid attribute that is unique across the directory.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class HBaseColumnIndexTable extends HBaseIndexTableBase
+{
+    private static final Logger LOG = LoggerFactory.getLogger( HBaseColumnIndexTable.class );
+
+    private Cache<String, Info> infoCache;
+
+
+    public HBaseColumnIndexTable( String attributeTypeOid, SchemaManager schemaManager, String tablePrefix )
+        throws Exception
+    {
+        super( attributeTypeOid, schemaManager, tablePrefix );
+        this.infoCache = new Cache<String, Info>();
+    }
+
+
+    public int count( Object value ) throws Exception
+    {
+        Info info = fetchInfo( getCountKey( value ) );
+        return info.count.intValue();
+    }
+
+
+    public byte[] getScanKey( Object value, byte[] entryId ) throws Exception
+    {
+        ByteBuffer bb = new ByteBuffer();
+
+        bb.append( '#' );
+
+        // add value
+        // there are special values to support attribute scan
+        if ( value == FULL_SCAN_START )
+        {
+            bb.append( FULL_SCAN_START );
+        }
+        else if ( value == FULL_SCAN_STOP )
+        {
+            bb.append( FULL_SCAN_STOP );
+        }
+        else if ( value != null )
+        {
+            byte[] normValue = getNormalized( value );
+            bb.append( normValue );
+        }
+
+        // add entryId
+        if ( entryId == VALUE_SCAN_START )
+        {
+            bb.append( VALUE_SCAN_START );
+        }
+        else if ( entryId == VALUE_SCAN_STOP )
+        {
+            bb.append( VALUE_SCAN_STOP );
+        }
+
+        return bb.copyOfUsedBytes();
+    }
+
+
+    public List<Long> getColumnCandidates( Object value ) throws Exception
+    {
+        Info info = fetchInfo( getCountKey( value ) );
+        return info.candidates;
+    }
+
+
+    public List<Long> getColumnCandidates( Result result ) throws Exception
+    {
+        Info info = extractInfo( result );
+        return info.candidates;
+    }
+
+
+    /**
+     * Checks if an index (attributeType=value) for the entry id exists.
+     *
+     * @param attributeType the attribute type
+     * @param value the exact value
+     * @param id the entry id
+     * @return
+     * @throws Exception "index";
+     */
+    public boolean exists( Object value, Long id ) throws Exception
+    {
+        Info info = fetchInfo( getCountKey( value ) );
+        boolean exists = info.candidates.contains( id );
+        return exists;
+    }
+
+
+    private Info fetchInfo( byte[] row ) throws Exception
+    {
+        String key = String.valueOf( Base64.encode( row ) );
+        if ( infoCache.contains( key ) )
+        {
+            return infoCache.get( key );
+        }
+
+        Get get = new Get( row );
+        get.addFamily( INFO_FAMILY );
+        Result result = HBaseTableHelper.get( getIndexTablePool(), indexTableName, get );
+        return extractInfo( result );
+    }
+
+
+    private Info extractInfo( Result result ) throws Exception
+    {
+        byte[] row = result.getRow();
+        String key = String.valueOf( Base64.encode( row ) );
+
+        Info info = new Info();
+        info.value = getValueFromCountKey( row );
+        infoCache.put( key, info );
+
+        if ( result.isEmpty() )
+        {
+            // if the row doesn't exist we cache and return an empty info object
+            return info;
+        }
+
+        NavigableMap<byte[], byte[]> infoMap = result.getFamilyMap( INFO_FAMILY );
+        for ( byte[] qualifier : infoMap.keySet() )
+        {
+            if ( qualifier.length == 8 )
+            {
+                info.candidates.add( Bytes.toLong( qualifier ) );
+            }
+            else if ( Bytes.equals( COUNT_QUALIFIER, qualifier ) )
+            {
+                info.count = Bytes.toLong( result.getFamilyMap( INFO_FAMILY ).get( qualifier ) );
+            }
+        }
+        return info;
+    }
+
+
+    protected void add( byte[] value, Long id ) throws Exception
+    {
+        // exact match (attribute=value): #value -> count, value, id
+        // check first if the index already exists because we won't increment the index count
+        byte[] exactCountRow = getCountKey( value );
+        Get exactGet = new Get( exactCountRow );
+        exactGet.addColumn( INFO_FAMILY, Bytes.toBytes( id ) );
+        if ( !HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) )
+        {
+            // get+put+put is not atomic!
+            Put exactPut = new Put( exactCountRow );
+            exactPut.setWriteToWAL( false );
+            exactPut.add( INFO_FAMILY, Bytes.toBytes( id ), Bytes.toBytes( id ) );
+            HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut );
+
+            // increment exact match count: value: -> count
+            HBaseTableHelper.increment( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY,
+                COUNT_QUALIFIER );
+        }
+    }
+
+
+    protected void delete( byte[] value, Long id ) throws Exception
+    {
+        // exact match (attribute=value): #value -> count, value, id
+        // check first if the index exists because we won't decrement the index count otherwise
+        byte[] exactCountRow = getCountKey( value );
+        Get exactGet = new Get( exactCountRow );
+        exactGet.addColumn( INFO_FAMILY, Bytes.toBytes( id ) );
+        if ( HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) )
+        {
+            Delete exactDel = new Delete( exactCountRow );
+            exactDel.deleteColumn( INFO_FAMILY, Bytes.toBytes( id ) );
+            HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel );
+
+            // decrement exact match count: #value:0 -> count
+            HBaseTableHelper.decrement( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY,
+                COUNT_QUALIFIER );
+        }
+    }
+
+    class Info
+    {
+        Object value;
+        Long count = 0L;
+        List<Long> candidates = new ArrayList<Long>();
+    }
+
+}

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java?rev=902620&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java Sun Jan 24 19:04:37 2010
@@ -0,0 +1,76 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.table;
+
+
+import org.apache.directory.shared.ldap.entry.EntryAttribute;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+/**
+ * Index Table.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public interface HBaseIndexTable
+{
+
+    public static final byte[] INFO_FAMILY = Bytes.toBytes( "info" );
+    public static final byte[] ID_QUALIFIER = Bytes.toBytes( "id" );
+    public static final byte[] COUNT_QUALIFIER = Bytes.toBytes( "count" );
+    public static final byte[] VALUE_SCAN_START = new byte[]
+        { 0x00 };
+    public static final byte[] VALUE_SCAN_STOP = new byte[]
+        { ( byte ) 0xFF };
+    public static final byte[] FULL_SCAN_START = new byte[]
+        { 0x00 };
+    public static final byte[] FULL_SCAN_STOP = new byte[]
+        { ( byte ) 0xFF };
+
+
+    public abstract void destroy() throws Exception;
+
+
+    public abstract void add( EntryAttribute attribute, Long id ) throws Exception;
+
+
+    public abstract void delete( EntryAttribute attribute, Long id ) throws Exception;
+
+
+    public abstract int count( Object value ) throws Exception;
+
+
+    /**
+     * Checks if an index (attributeType=value) for the entry ID exists.
+     *
+     * @param value the exact value
+     * @param id the entry id
+     * @return true if an index exists
+     * @throws Exception
+     */
+    public abstract boolean exists( Object value, Long id ) throws Exception;
+
+
+    public abstract ResultScanner getScanner( Scan scan ) throws Exception;
+
+}
\ No newline at end of file

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java?rev=902620&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java Sun Jan 24 19:04:37 2010
@@ -0,0 +1,176 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.table;
+
+
+import org.apache.directory.server.core.partition.hbase.Cache;
+import org.apache.directory.shared.ldap.entry.EntryAttribute;
+import org.apache.directory.shared.ldap.entry.Value;
+import org.apache.directory.shared.ldap.entry.client.ClientBinaryValue;
+import org.apache.directory.shared.ldap.schema.AttributeType;
+import org.apache.directory.shared.ldap.schema.SchemaManager;
+import org.apache.directory.shared.ldap.util.ByteBuffer;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+/**
+ * Base implementation of an HBase specific index table.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public abstract class HBaseIndexTableBase implements HBaseIndexTable
+{
+
+    protected String attributeTypeOid;
+    protected String indexTableName;
+    private HTablePool indexTablePool;
+    protected SchemaManager schemaManager;
+    protected Cache<Object, Long> countCache;
+    protected Cache<Object, Boolean> existsCache;
+
+
+    public HBaseIndexTableBase( String attributeTypeOid, SchemaManager schemaManager, String tablePrefix )
+        throws Exception
+    {
+        this.attributeTypeOid = attributeTypeOid;
+        this.schemaManager = schemaManager;
+        String name = schemaManager.getGlobalOidRegistry().getPrimaryName( attributeTypeOid );
+        this.indexTableName = tablePrefix + "index_" + name;
+        this.countCache = new Cache<Object, Long>();
+        this.existsCache = new Cache<Object, Boolean>();
+    }
+
+
+    public void destroy() throws Exception
+    {
+    }
+
+
+    public ResultScanner getScanner( Scan scan ) throws Exception
+    {
+        return HBaseTableHelper.getScanner( getIndexTablePool(), indexTableName, scan );
+    }
+
+
+    public void add( EntryAttribute attribute, Long id ) throws Exception
+    {
+        for ( Value<?> value : attribute )
+        {
+            add( value.getBytes(), id );
+        }
+    }
+
+
+    protected abstract void add( byte[] value, Long id ) throws Exception;
+
+
+    public void delete( EntryAttribute attribute, Long id ) throws Exception
+    {
+        for ( Value<?> value : attribute )
+        {
+            delete( value.getBytes(), id );
+        }
+    }
+
+
+    protected abstract void delete( byte[] value, Long id ) throws Exception;
+
+
+    protected byte[] getNormalized( Object value ) throws Exception
+    {
+        AttributeType at = schemaManager.getAttributeTypeRegistry().lookup( attributeTypeOid );
+        byte[] normValue;
+
+        if ( value instanceof String )
+        {
+            String s = at.getEquality().getNormalizer().normalize( ( String ) value );
+            normValue = Bytes.toBytes( s );
+        }
+        else if ( value instanceof byte[] )
+        {
+            Value<?> normalized = at.getEquality().getNormalizer()
+                .normalize( new ClientBinaryValue( ( byte[] ) value ) );
+            if ( normalized.isBinary() )
+            {
+                normValue = normalized.getBytes();
+            }
+            else
+            {
+                normValue = Bytes.toBytes( normalized.getString() );
+            }
+        }
+        else
+        {
+            throw new Exception( "Unexpected type: " + value );
+        }
+
+        return normValue;
+    }
+
+
+    protected HTablePool getIndexTablePool() throws Exception
+    {
+        if ( indexTablePool == null )
+        {
+            HBaseConfiguration configuration = HBaseTableHelper.getHBaseConfiguration();
+            // ensure table is created
+            HBaseTableHelper.createTable( configuration, indexTableName, INFO_FAMILY );
+            indexTablePool = new HTablePool( configuration, 16 );
+        }
+        return indexTablePool;
+    }
+
+
+    /**
+     * Gets the count key. 
+     * The key has the following syntax:
+     *   <pre>  #value</pre>
+     * where <code>value</code> is the normalized value.
+     *
+     * @param value the value
+     * @return the count row key for the value
+     * @throws Exception
+     */
+    protected byte[] getCountKey( Object value ) throws Exception
+    {
+        ByteBuffer bb = new ByteBuffer();
+
+        bb.append( '#' );
+
+        byte[] normValue = getNormalized( value );
+        bb.append( normValue );
+
+        return bb.copyOfUsedBytes();
+    }
+
+
+    public Object getValueFromCountKey( byte[] row ) throws Exception
+    {
+        byte[] value = Bytes.tail( row, row.length - 1 );
+        AttributeType at = schemaManager.getAttributeTypeRegistry().lookup( attributeTypeOid );
+        return at.getSyntax().isHumanReadable() ? Bytes.toString( value ) : value;
+    }
+
+}
\ No newline at end of file

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java?rev=902620&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java Sun Jan 24 19:04:37 2010
@@ -0,0 +1,695 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.table;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.directory.server.core.entry.DefaultServerAttribute;
+import org.apache.directory.server.core.entry.DefaultServerEntry;
+import org.apache.directory.server.core.entry.ServerAttribute;
+import org.apache.directory.server.core.entry.ServerBinaryValue;
+import org.apache.directory.server.core.entry.ServerEntry;
+import org.apache.directory.server.core.entry.ServerStringValue;
+import org.apache.directory.server.core.partition.hbase.Cache;
+import org.apache.directory.shared.ldap.entry.EntryAttribute;
+import org.apache.directory.shared.ldap.entry.Value;
+import org.apache.directory.shared.ldap.name.LdapDN;
+import org.apache.directory.shared.ldap.name.RDN;
+import org.apache.directory.shared.ldap.schema.AttributeType;
+import org.apache.directory.shared.ldap.schema.SchemaManager;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+/**
+ * The master table stores all relevant information of an entry:
+ * its parent, its user provided RDN, and its user provided attributes 
+ * and values.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class HBaseMasterTable
+{
+    public static final Long ROOT_ID = 0L;
+    public static final byte[] ROOT_ROW = Bytes.toBytes( ROOT_ID );
+    public static final byte[] SEQUENCE_ROW = Bytes.toBytes( ROOT_ID );
+    public static final byte[] SEQUENCE_QUALIFIER = Bytes.toBytes( "sequence" );
+
+    public static final byte[] TREE_INFO_FAMILY = Bytes.toBytes( "treeInfo" );
+    public static final byte[] ID_QUALIFIER = Bytes.toBytes( "id" );
+    public static final byte[] PARENT_ID_QUALIFIER = Bytes.toBytes( "parentId" );
+    public static final byte[] UP_RDN_QUALIFIER = Bytes.toBytes( "upRdn" );
+    public static final byte[] NORM_RDN_QUALIFIER = Bytes.toBytes( "normRdn" );
+
+    public static final byte[] UP_ATTRIBUTES_FAMILY = Bytes.toBytes( "upAttributes" );
+
+    public static final byte[] ONE_LEVEL_COUNT_QUALIFIER = Bytes.toBytes( "oneLevelCount" );
+    public static final byte[] SUB_LEVEL_COUNT_QUALIFIER = Bytes.toBytes( "subLevelCount" );
+
+    public static final char KEY_DELIMITER = ',';
+    public static final byte[] KEY_DELIMITER_BYTES = new byte[]
+        { KEY_DELIMITER };
+    public static final byte[] NORM_ATTRIBUTES_FAMILY = Bytes.toBytes( "normAttributes" );
+    public static final char NORM_ATTRIBUTE_DELIMITER = '=';
+    public static final byte[] NORM_ATTRIBUTE_DELIMITER_BYTES = new byte[]
+        { NORM_ATTRIBUTE_DELIMITER };
+
+    private static final String MASTER_TABLE = "master";
+    private static final String TREE_TABLE = "tree";
+
+    //    private HBaseStore store;
+    private SchemaManager schemaManager;
+    private LdapDN suffixDn;
+    //    private String tablePrefix;
+
+    private HTablePool masterTablePool;
+    private String masterTableName;
+    private HTablePool treeTablePool;
+    private String treeTableName;
+    private MasterTreeInfo suffixMti;
+
+    /** id -> master tree info */
+    private Cache<Long, MasterTreeInfo> mtiCache = new Cache<Long, MasterTreeInfo>();
+
+    /** master tree info -> id */
+    private Cache<String, Long> idCache = new Cache<String, Long>();
+
+    /** id -> one level count */
+    private Cache<Long, Long> oneLevelCountCache = new Cache<Long, Long>();
+
+    /** id -> sub level count */
+    private Cache<Long, Long> subLevelCountCache = new Cache<Long, Long>();
+
+    /** id -> DN */
+    private Cache<Long, LdapDN> dnCache = new Cache<Long, LdapDN>();
+
+    /** id -> entry */
+    private Cache<Long, ServerEntry> entryCache = new Cache<Long, ServerEntry>();
+
+
+    public HBaseMasterTable( SchemaManager schemaManager, LdapDN suffixDn, String tablePrefix )
+    {
+        //        this.store = store;
+        //        this.schemaManager = store.getSchemaManager();
+        //        this.suffixDn = store.getSuffix();
+        //        this.tablePrefix = store.getTablePrefix();
+        this.schemaManager = schemaManager;
+        this.suffixDn = suffixDn;
+        //        this.tablePrefix = tablePrefix;
+        this.masterTableName = tablePrefix + MASTER_TABLE;
+        this.treeTableName = tablePrefix + TREE_TABLE;
+        this.suffixMti = new MasterTreeInfo( ROOT_ID, suffixDn.getNormName(), null );
+    }
+
+
+    public void destroy() throws Exception
+    {
+    }
+
+
+    public Long add( ServerEntry entry ) throws Exception
+    {
+        Long id = nextId();
+        List<Long> parentIds = fetchParentIds( entry.getDn(), false );
+        String upRdn;
+        String normRdn;
+        MasterTreeInfo treeTableKey;
+        if ( entry.getDn().equals( suffixDn ) )
+        {
+            upRdn = entry.getDn().getName();
+            normRdn = entry.getDn().getNormName();
+            treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn );
+        }
+        else
+        {
+            upRdn = entry.getDn().getRdn().getUpName();
+            normRdn = entry.getDn().getRdn().getNormName();
+            treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn );
+        }
+
+        // put to master and tree table
+        Put masterPut = new Put( Bytes.toBytes( id ) );
+        masterPut.add( TREE_INFO_FAMILY, PARENT_ID_QUALIFIER, Bytes.toBytes( parentIds.get( 0 ) ) );
+        masterPut.add( TREE_INFO_FAMILY, UP_RDN_QUALIFIER, Bytes.toBytes( upRdn ) );
+        masterPut.add( TREE_INFO_FAMILY, NORM_RDN_QUALIFIER, Bytes.toBytes( normRdn ) );
+        Put treePut = new Put( treeTableKey.treeTableKey );
+        treePut.add( TREE_INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) );
+        for ( EntryAttribute attribute : entry )
+        {
+            String attr = attribute.getUpId();
+            String attrOid = ( ( ServerAttribute ) attribute ).getAttributeType().getOid();
+            for ( int i = 0; i < attribute.size(); i++ )
+            {
+                Value<?> value = attribute.get( i );
+
+                // upAttributes:
+                // objectClass0 -> inetOrgPerson
+                // objectClass1 -> top
+                masterPut.add( UP_ATTRIBUTES_FAMILY, Bytes.add( Bytes.toBytes( attr ), Bytes.toBytes( i ) ), value
+                    .getBytes() );
+
+                // normAttributes:
+                // 2.5.4.0:inetorgperson -> 0
+                // 2.5.4.0:top -> 1
+                //if ( store.hasUserIndexOn( attrOid ) )
+                {
+                    Object normalizedValue = value.getNormalizedValue();
+                    byte[] normalizedValueBytes;
+                    if ( normalizedValue instanceof String )
+                    {
+                        normalizedValueBytes = Bytes.toBytes( ( String ) normalizedValue );
+                    }
+                    else
+                    {
+                        normalizedValueBytes = ( byte[] ) normalizedValue;
+                    }
+                    treePut.add( NORM_ATTRIBUTES_FAMILY, Bytes.add( Bytes.toBytes( attrOid ),
+                        NORM_ATTRIBUTE_DELIMITER_BYTES, normalizedValueBytes ), Bytes.toBytes( i ) );
+                }
+            }
+        }
+        HBaseTableHelper.put( getMasterTablePool(), masterTableName, masterPut );
+        HBaseTableHelper.put( getTreeTablePool(), treeTableName, treePut );
+
+        // update parent one-level count
+        MasterTreeInfo parentKey = fetchMasterTreeInfo( parentIds.get( 0 ) );
+        if ( parentKey != null )
+        {
+            HBaseTableHelper.increment( getTreeTablePool(), treeTableName, parentKey.treeTableKey, TREE_INFO_FAMILY,
+                ONE_LEVEL_COUNT_QUALIFIER );
+        }
+
+        // update all parents sub-level count
+        for ( Long parentId : parentIds )
+        {
+            parentKey = fetchMasterTreeInfo( parentId );
+            if ( parentKey != null )
+            {
+                HBaseTableHelper.increment( getTreeTablePool(), treeTableName, parentKey.treeTableKey,
+                    TREE_INFO_FAMILY, SUB_LEVEL_COUNT_QUALIFIER );
+            }
+        }
+
+        // clear caches
+        oneLevelCountCache.clear();
+        subLevelCountCache.clear();
+
+        return id;
+    }
+
+
+    public void delete( Long id, ServerEntry entry ) throws Exception
+    {
+        MasterTreeInfo key = fetchMasterTreeInfo( id );
+
+        // delete in master table
+        Delete masterDel = new Delete( Bytes.toBytes( id ) );
+        HBaseTableHelper.delete( getMasterTablePool(), masterTableName, masterDel );
+
+        // delete in tree table
+        Delete treeDel = new Delete( key.treeTableKey );
+        HBaseTableHelper.delete( getTreeTablePool(), treeTableName, treeDel );
+
+        // update parent one-level count
+        Long parentId = key.parentId;
+        if ( parentId > ROOT_ID )
+        {
+            MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId );
+            HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey, TREE_INFO_FAMILY,
+                ONE_LEVEL_COUNT_QUALIFIER );
+        }
+
+        // update sub-level count of all parents
+        while ( parentId > ROOT_ID )
+        {
+            MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId );
+            HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey, TREE_INFO_FAMILY,
+                SUB_LEVEL_COUNT_QUALIFIER );
+            parentId = parentKey.parentId;
+        }
+
+        // clear caches
+        mtiCache.clear();
+        idCache.clear();
+        dnCache.clear();
+        entryCache.clear();
+        oneLevelCountCache.clear();
+        subLevelCountCache.clear();
+    }
+
+
+    public void modify( Long id, ServerEntry entry ) throws Exception
+    {
+        // TODO: replace quick-and-dirty implementation (delete+put) with with better algorithm
+        MasterTreeInfo ttk = fetchMasterTreeInfo( id );
+
+        // delete complete attribute family
+        Delete masterDel = new Delete( Bytes.toBytes( id ) );
+        masterDel.deleteFamily( UP_ATTRIBUTES_FAMILY );
+        HBaseTableHelper.delete( getMasterTablePool(), masterTableName, masterDel );
+        Delete treeDel = new Delete( Bytes.toBytes( id ) );
+        treeDel.deleteFamily( NORM_ATTRIBUTES_FAMILY );
+        HBaseTableHelper.delete( getTreeTablePool(), treeTableName, treeDel );
+
+        // dirty workaround:
+        // we need to avoid that the new attributes are written in the same 
+        // millisecond as the old attributes were deleted.
+        Thread.sleep( 20 );
+
+        // add all attributes
+        // TODO: duplicate code
+        Put masterPut = new Put( Bytes.toBytes( id ) );
+        Put treePut = new Put( ttk.treeTableKey );
+        for ( EntryAttribute attribute : entry )
+        {
+            String attr = attribute.getUpId();
+            String attrOid = ( ( ServerAttribute ) attribute ).getAttributeType().getOid();
+            for ( int i = 0; i < attribute.size(); i++ )
+            {
+                Value<?> value = attribute.get( i );
+
+                // upAttributes:
+                // objectClass:0 -> top
+                // objectClass:1 -> inetOrgPerson
+                masterPut.add( UP_ATTRIBUTES_FAMILY, Bytes.add( Bytes.toBytes( attr ), Bytes.toBytes( i ) ), value
+                    .getBytes() );
+
+                // normAttributes:
+                // 2.5.4.0:top -> top
+                // 2.5.4.0:inetorgperson -> inetorgperson
+                //if ( store.hasUserIndexOn( attrOid ) )
+                {
+                    Object normalizedValue = value.getNormalizedValue();
+                    byte[] normalizedValueBytes;
+                    if ( normalizedValue instanceof String )
+                    {
+                        normalizedValueBytes = Bytes.toBytes( ( String ) normalizedValue );
+                    }
+                    else
+                    {
+                        normalizedValueBytes = ( byte[] ) normalizedValue;
+                    }
+                    treePut.add( NORM_ATTRIBUTES_FAMILY, Bytes.add( Bytes.toBytes( attrOid ),
+                        NORM_ATTRIBUTE_DELIMITER_BYTES, normalizedValueBytes ), Bytes.toBytes( i ) );
+                }
+            }
+        }
+        HBaseTableHelper.put( getMasterTablePool(), masterTableName, masterPut );
+        HBaseTableHelper.put( getTreeTablePool(), treeTableName, treePut );
+    }
+
+
+    /**
+     * Fetch the entry by its ID.
+     * 
+     * @param id the entry id
+     * @return the server entry, null if entry doesn't exist.
+     * @throws Exception the exception
+     */
+    public ServerEntry fetchEntry( Long id ) throws Exception
+    {
+        if ( id == null || id.equals( ROOT_ID ) )
+        {
+            return null;
+        }
+        if ( entryCache.contains( id ) )
+        {
+            return entryCache.get( id );
+        }
+
+        Get entryGet = new Get( Bytes.toBytes( id ) );
+        entryGet.addColumn( TREE_INFO_FAMILY, ID_QUALIFIER );
+        entryGet.addColumn( TREE_INFO_FAMILY, PARENT_ID_QUALIFIER );
+        entryGet.addColumn( TREE_INFO_FAMILY, UP_RDN_QUALIFIER );
+        entryGet.addColumn( TREE_INFO_FAMILY, NORM_RDN_QUALIFIER );
+        entryGet.addColumn( UP_ATTRIBUTES_FAMILY );
+        Result result = HBaseTableHelper.get( getMasterTablePool(), masterTableName, entryGet );
+        if ( result.getRow() == null )
+        {
+            return null;
+        }
+        ServerEntry entry = convertToServerEntry( id, result );
+        entryCache.put( id, entry );
+        return entry;
+    }
+
+
+    public ServerEntry convertToServerEntry( Long id, Result result ) throws Exception
+    {
+        Long parentId = Bytes.toLong( result.getValue( TREE_INFO_FAMILY, PARENT_ID_QUALIFIER ) );
+        String normRdn = Bytes.toString( result.getValue( TREE_INFO_FAMILY, UP_RDN_QUALIFIER ) );
+        String upRdn = Bytes.toString( result.getValue( TREE_INFO_FAMILY, UP_RDN_QUALIFIER ) );
+        MasterTreeInfo mti = new MasterTreeInfo( parentId, normRdn, upRdn );
+        LdapDN dn = fetchDn( id, mti );
+
+        NavigableMap<byte[], byte[]> attributesMap = result.getFamilyMap( UP_ATTRIBUTES_FAMILY );
+        ServerEntry entry = new DefaultServerEntry( schemaManager, dn );
+        for ( Map.Entry<byte[], byte[]> attributeEntry : attributesMap.entrySet() )
+        {
+            byte[] qualifier = attributeEntry.getKey();
+            String attributeDescription = Bytes.toString( attributeEntry.getKey(), 0, qualifier.length - 4 );
+
+            EntryAttribute attribute = entry.get( attributeDescription );
+            AttributeType attributeType = schemaManager.getAttributeTypeRegistry().lookup( attributeDescription );
+            if ( attribute == null )
+            {
+                attribute = new DefaultServerAttribute( attributeDescription, attributeType );
+                entry.add( attribute );
+            }
+            Value<?> value;
+            if ( attribute.isHR() )
+            {
+                value = new ServerStringValue( attributeType, Bytes.toString( attributeEntry.getValue() ) );
+            }
+            else
+            {
+                value = new ServerBinaryValue( attributeType, attributeEntry.getValue() );
+            }
+            attribute.add( value );
+        }
+        return entry;
+    }
+
+
+    private LdapDN fetchDn( Long id, MasterTreeInfo mti ) throws Exception
+    {
+        if ( dnCache.contains( id ) )
+        {
+            return dnCache.get( id );
+        }
+
+        // build DN, only normalize the RDN part, the parent is already normalized
+        StringBuffer sb = new StringBuffer();
+        do
+        {
+            sb.append( mti.upName );
+            mti = fetchMasterTreeInfo( mti.parentId );
+            if ( mti != null )
+            {
+                sb.append( "," );
+            }
+        }
+        while ( mti != null );
+        LdapDN dn = new LdapDN( sb.toString() );
+        dn.normalize( schemaManager.getAttributeTypeRegistry().getNormalizerMapping() );
+
+        // put DN to cache
+        dnCache.put( id, dn );
+
+        return dn;
+    }
+
+
+    /**
+     * Gets the ID of the given entry.
+     *
+     * @param id the entry DN
+     * @return the entry ID, null if entry doesn't exist.
+     * @throws Exception
+     */
+    public Long fetchId( LdapDN dn ) throws Exception
+    {
+        if ( dn == null )
+        {
+            return null;
+        }
+        List<Long> ids = fetchParentIds( dn, true );
+        return ids == null ? null : ids.get( 0 );
+    }
+
+
+    /**
+     * Gets the parent ID of the given entry.
+     *
+     * @param id the entry ID
+     * @return the parent ID, null if entry doesn't exist.
+     * @throws Exception
+     */
+    public Long fetchParentId( Long id ) throws Exception
+    {
+        if ( id == null || id.equals( ROOT_ID ) )
+        {
+            return null;
+        }
+        MasterTreeInfo mti = fetchMasterTreeInfo( id );
+        if ( mti == null )
+        {
+            return null;
+        }
+        return mti.parentId;
+    }
+
+
+    public NavigableMap<byte[], byte[]> fetchNormAttributes( Long id ) throws Exception
+    {
+        MasterTreeInfo mti = fetchMasterTreeInfo( id );
+        Get get = new Get( mti.treeTableKey );
+        get.addFamily( NORM_ATTRIBUTES_FAMILY );
+        Result result = HBaseTableHelper.get( getTreeTablePool(), treeTableName, get );
+        NavigableMap<byte[], byte[]> attributeMap = result.getFamilyMap( NORM_ATTRIBUTES_FAMILY );
+        return attributeMap;
+    }
+
+
+    /**
+     * Gets the one level count of the given entry.
+     *
+     * @param id the entry ID
+     * @return the one level count, 0 if entry doesn't exist.
+     * @throws Exception
+     */
+    public int getOneLevelCount( Long id ) throws Exception
+    {
+        if ( oneLevelCountCache.contains( id ) )
+        {
+            return oneLevelCountCache.get( id ).intValue();
+        }
+
+        MasterTreeInfo mti = fetchMasterTreeInfo( id );
+        if ( mti == null )
+        {
+            return 0;
+        }
+        Long count = HBaseTableHelper.getLongValue( getTreeTablePool(), treeTableName, mti.treeTableKey,
+            TREE_INFO_FAMILY, ONE_LEVEL_COUNT_QUALIFIER, 0L );
+
+        oneLevelCountCache.put( id, count );
+        return count.intValue();
+    }
+
+
+    /**
+     * Gets the sub level count of the given entry.
+     *
+     * @param id the entry ID
+     * @return the sub level count, 0 if entry doesn't exist.
+     * @throws Exception
+     */
+    public int getSubLevelCount( Long id ) throws Exception
+    {
+        if ( subLevelCountCache.contains( id ) )
+        {
+            return subLevelCountCache.get( id ).intValue();
+        }
+
+        MasterTreeInfo mti = fetchMasterTreeInfo( id );
+        if ( mti == null )
+        {
+            return 0;
+        }
+        Long count = HBaseTableHelper.getLongValue( getTreeTablePool(), treeTableName, mti.treeTableKey,
+            TREE_INFO_FAMILY, SUB_LEVEL_COUNT_QUALIFIER, 0L );
+
+        subLevelCountCache.put( id, count );
+        return count.intValue();
+    }
+
+
+    public ResultScanner getScanner( Scan scan ) throws Exception
+    {
+        ResultScanner scanner = HBaseTableHelper.getScanner( getTreeTablePool(), treeTableName, scan );
+        return scanner;
+    }
+
+
+    private List<Long> fetchParentIds( LdapDN dn, boolean includeChild ) throws Exception
+    {
+        if ( !dn.startsWith( suffixDn ) )
+        {
+            return null;
+        }
+        List<Long> path = new ArrayList<Long>();
+        path.add( ROOT_ID );
+
+        if ( !dn.equals( suffixDn ) || includeChild )
+        {
+            Long parentId = getSuffixId();
+            if ( parentId == null )
+            {
+                return null;
+            }
+            path.add( 0, parentId );
+
+            int stop = includeChild ? dn.size() : dn.size() - 1;
+            for ( int i = suffixDn.size(); i < stop; i++ )
+            {
+                RDN rdn = dn.getRdn( i );
+                MasterTreeInfo mti = new MasterTreeInfo( parentId, rdn.getNormName(), null );
+                parentId = fetchId( mti );
+                if ( parentId == null )
+                {
+                    return null;
+                }
+                path.add( 0, parentId );
+            }
+        }
+
+        return path;
+    }
+
+
+    private Long getSuffixId() throws Exception
+    {
+        return fetchId( suffixMti );
+    }
+
+
+    private long nextId() throws Exception
+    {
+        byte[] id = HBaseTableHelper.increment( getMasterTablePool(), masterTableName, SEQUENCE_ROW, TREE_INFO_FAMILY,
+            SEQUENCE_QUALIFIER );
+        return Bytes.toLong( id );
+    }
+
+
+    private HTablePool getMasterTablePool() throws Exception
+    {
+        if ( masterTablePool == null )
+        {
+            HBaseConfiguration configuration = HBaseTableHelper.getHBaseConfiguration();
+            // ensure table is created
+            HBaseTableHelper.createTable( configuration, masterTableName, TREE_INFO_FAMILY, UP_ATTRIBUTES_FAMILY );
+            masterTablePool = new HTablePool( configuration, 16 );
+        }
+        return masterTablePool;
+    }
+
+
+    private HTablePool getTreeTablePool() throws Exception
+    {
+        if ( treeTablePool == null )
+        {
+            HBaseConfiguration configuration = HBaseTableHelper.getHBaseConfiguration();
+            HBaseTableHelper.createTable( configuration, treeTableName, TREE_INFO_FAMILY, NORM_ATTRIBUTES_FAMILY );
+            treeTablePool = new HTablePool( configuration, 16 );
+        }
+        return treeTablePool;
+    }
+
+
+    private Long fetchId( MasterTreeInfo mti ) throws Exception
+    {
+        if ( idCache.contains( mti.string ) )
+        {
+            return idCache.get( mti.string );
+        }
+
+        Long id = HBaseTableHelper.getLongValue( getTreeTablePool(), treeTableName, mti.treeTableKey, TREE_INFO_FAMILY,
+            ID_QUALIFIER, null );
+        if ( id == null )
+        {
+            return null;
+        }
+
+        idCache.put( mti.string, id );
+        // don't cache "mtiCache.put( id, mti );" because mti.upRdn is not set
+        return id;
+    }
+
+
+    /**
+     * Fetch master tree info.
+     *
+     * @param id the entry ID
+     * @return the master tree info, null if entry doesn't exist
+     * @throws Exception
+     */
+    private MasterTreeInfo fetchMasterTreeInfo( Long id ) throws Exception
+    {
+        if ( id == null || id.equals( ROOT_ID ) )
+        {
+            return null;
+        }
+        if ( mtiCache.contains( id ) )
+        {
+            return mtiCache.get( id );
+        }
+
+        Get get = new Get( Bytes.toBytes( id ) );
+        get.addColumn( TREE_INFO_FAMILY, PARENT_ID_QUALIFIER );
+        get.addColumn( TREE_INFO_FAMILY, NORM_RDN_QUALIFIER );
+        get.addColumn( TREE_INFO_FAMILY, UP_RDN_QUALIFIER );
+        Result result = HBaseTableHelper.get( getMasterTablePool(), masterTableName, get );
+        if ( result.getRow() == null )
+        {
+            return null;
+        }
+        Long parentId = Bytes.toLong( result.getValue( TREE_INFO_FAMILY, PARENT_ID_QUALIFIER ) );
+        String normRdn = Bytes.toString( result.getValue( TREE_INFO_FAMILY, NORM_RDN_QUALIFIER ) );
+        String upRdn = Bytes.toString( result.getValue( TREE_INFO_FAMILY, UP_RDN_QUALIFIER ) );
+
+        MasterTreeInfo mti = new MasterTreeInfo( parentId, normRdn, upRdn );
+        mtiCache.put( id, mti );
+        idCache.put( mti.string, id );
+        return mti;
+    }
+
+    static class MasterTreeInfo
+    {
+        private final Long parentId;
+        private final String normName;
+        private final String upName;
+        public final byte[] treeTableKey;
+        public final String string;
+
+
+        public MasterTreeInfo( Long parentId, String normName, String upName )
+        {
+            this.parentId = parentId;
+            this.normName = normName;
+            this.upName = upName;
+            this.treeTableKey = Bytes.add( Bytes.toBytes( parentId ), KEY_DELIMITER_BYTES, Bytes.toBytes( normName ) );
+            this.string = parentId + KEY_DELIMITER + normName;
+        }
+    }
+
+}

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java?rev=902620&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java Sun Jan 24 19:04:37 2010
@@ -0,0 +1,212 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.table;
+
+
+import org.apache.directory.server.core.partition.hbase.Cache;
+import org.apache.directory.shared.ldap.schema.SchemaManager;
+import org.apache.directory.shared.ldap.util.Base64;
+import org.apache.directory.shared.ldap.util.ByteBuffer;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+/**
+ * HBase specific implementation of an index table for attribute presence 
+ * information. 
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class HBasePresenceIndexTable
+{
+    //    private static final Logger LOG = LoggerFactory.getLogger( HBasePresenceIndexTable.class );
+
+    public static final byte[] COUNT_ROW = Bytes.toBytes( "!" );
+    public static final byte[] INFO_FAMILY = Bytes.toBytes( "info" );
+    public static final byte[] ID_QUALIFIER = Bytes.toBytes( "id" );
+    public static final byte[] COUNT_QUALIFIER = Bytes.toBytes( "count" );
+    public static final byte[] VALUE_SCAN_FIRST_ENTRYID = new byte[]
+        { 0x00 };
+    public static final byte[] VALUE_SCAN_LAST_ENTRYID = new byte[]
+        { ( byte ) 0xFF };
+
+    protected String indexTableName;
+    private HTablePool indexTablePool;
+    private SchemaManager schemaManager;
+    private String attributeTypeOid;
+
+    private Cache<String, Long> countCache;
+
+
+    public HBasePresenceIndexTable( String attributeTypeOid, SchemaManager schemaManager, String tablePrefix )
+        throws Exception
+    {
+        this.attributeTypeOid = attributeTypeOid;
+        this.schemaManager = schemaManager;
+        String name = schemaManager.getGlobalOidRegistry().getPrimaryName( attributeTypeOid );
+        this.indexTableName = tablePrefix + "index_" + name;
+        this.countCache = new Cache<String, Long>();
+    }
+
+
+    public void destroy() throws Exception
+    {
+    }
+
+
+    public void add( Long id ) throws Exception
+    {
+        add( Bytes.toBytes( id ) );
+    }
+
+
+    public void delete( Long id ) throws Exception
+    {
+        delete( Bytes.toBytes( id ) );
+    }
+
+
+    /**
+     * Gets the index equals key. 
+     * The key has the following syntax:
+     *   <pre>  *&lt;entryId&gt;</pre>
+     * <code>entryId</code> is the Base64 encoded 8-byte row key from the master table.
+     *
+     * @param entryId the entry ID
+     * @return the presence index row key for the entry ID
+     * @throws Exception
+     */
+    public byte[] getPresenceKey( byte[] entryId ) throws Exception
+    {
+        ByteBuffer bb = new ByteBuffer();
+
+        bb.append( '*' );
+
+        // add entryId
+        if ( entryId == VALUE_SCAN_FIRST_ENTRYID )
+        {
+            bb.append( VALUE_SCAN_FIRST_ENTRYID );
+        }
+        else if ( entryId == VALUE_SCAN_LAST_ENTRYID )
+        {
+            bb.append( VALUE_SCAN_LAST_ENTRYID );
+        }
+        else if ( entryId != null )
+        {
+            bb.append( Bytes.toBytes( String.valueOf( Base64.encode( entryId ) ) ) );
+        }
+
+        return bb.copyOfUsedBytes();
+    }
+
+
+    public int count() throws Exception
+    {
+        if ( countCache.contains( attributeTypeOid ) )
+        {
+            return countCache.get( attributeTypeOid ).intValue();
+        }
+
+        Long count = HBaseTableHelper.getLongValue( getIndexTablePool(), indexTableName, COUNT_ROW, INFO_FAMILY,
+            COUNT_QUALIFIER, 0L );
+        countCache.put( attributeTypeOid, count );
+        return count.intValue();
+    }
+
+
+    /**
+     * Checks if the entry has such an attribute (attributeType=*)
+     *
+     * @param attributeType
+     * @param id the entry id
+     * @return true if the entry contains such an attribute type
+     * @throws Exception
+     */
+    public boolean exists( Long id ) throws Exception
+    {
+        byte[] row = getPresenceKey( Bytes.toBytes( id ) );
+        Get get = new Get( row );
+        return HBaseTableHelper.exists( getIndexTablePool(), indexTableName, get );
+    }
+
+
+    public ResultScanner getScanner( Scan scan ) throws Exception
+    {
+        return HBaseTableHelper.getScanner( getIndexTablePool(), indexTableName, scan );
+    }
+
+
+    private void add( byte[] entryId ) throws Exception
+    {
+        // presence (attribute=*): *<id> -> id
+        // check first if the index already exists because we won't increment the index count
+        byte[] presenceRow = getPresenceKey( entryId );
+        Get presenceGet = new Get( presenceRow );
+        if ( !HBaseTableHelper.exists( getIndexTablePool(), indexTableName, presenceGet ) )
+        {
+            // get+put+put is not atomic!
+            Put presencePut = new Put( presenceRow );
+            presencePut.setWriteToWAL( false );
+            presencePut.add( INFO_FAMILY, ID_QUALIFIER, entryId );
+            HBaseTableHelper.put( getIndexTablePool(), indexTableName, presencePut );
+
+            // increment existence count: attribute: -> count
+            HBaseTableHelper.increment( getIndexTablePool(), indexTableName, COUNT_ROW, INFO_FAMILY, COUNT_QUALIFIER );
+        }
+    }
+
+
+    private void delete( byte[] entryId ) throws Exception
+    {
+        // presence (attribute=*): *<id> -> id
+        // check first if the index exists because we won't decrement the index count otherwise
+        byte[] presenceRow = getPresenceKey( entryId );
+        Get presenceGet = new Get( presenceRow );
+        if ( HBaseTableHelper.exists( getIndexTablePool(), indexTableName, presenceGet ) )
+        {
+            Delete presenceDel = new Delete( presenceRow );
+            HBaseTableHelper.delete( getIndexTablePool(), indexTableName, presenceDel );
+
+            // decrement existence count: attribute: -> count
+            HBaseTableHelper.decrement( getIndexTablePool(), indexTableName, COUNT_ROW, INFO_FAMILY, COUNT_QUALIFIER );
+        }
+    }
+
+
+    protected HTablePool getIndexTablePool() throws Exception
+    {
+        if ( indexTablePool == null )
+        {
+            HBaseConfiguration configuration = HBaseTableHelper.getHBaseConfiguration();
+            // ensure table is created
+            HBaseTableHelper.createTable( configuration, indexTableName, INFO_FAMILY );
+            indexTablePool = new HTablePool( configuration, 16 );
+        }
+        return indexTablePool;
+    }
+
+}

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java?rev=902620&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java Sun Jan 24 19:04:37 2010
@@ -0,0 +1,207 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.table;
+
+
+import org.apache.directory.shared.ldap.schema.AttributeType;
+import org.apache.directory.shared.ldap.schema.SchemaManager;
+import org.apache.directory.shared.ldap.util.Base64;
+import org.apache.directory.shared.ldap.util.ByteBuffer;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * HBase specific implementation of an user index table that stores each combination
+ * of value and candidate its own row. This implementation is intended to be used
+ * for values for that are used by many candidates, an example is the 
+ * objectClass:person attribute value that may be used by millions of entries.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class HBaseRowIndexTable extends HBaseIndexTableBase
+{
+    private static final Logger LOG = LoggerFactory.getLogger( HBaseRowIndexTable.class );
+
+
+    public HBaseRowIndexTable( String attributeTypeOid, SchemaManager schemaManager, String tablePrefix )
+        throws Exception
+    {
+        super( attributeTypeOid, schemaManager, tablePrefix );
+    }
+
+
+    /**
+     * Gets the index equals key. 
+     * The key has the following syntax:
+     *   <pre>  =value&lt;entryId&gt;</pre>
+     * where <code>value</code> is the normalized value, and
+     * <code>entryId</code> is the Base64 encoded 8-byte row key from the master table.
+     *
+     * @param value the value
+     * @param entryId the entry ID
+     * @return the equals row key for the value
+     * @throws Exception
+     */
+    private byte[] getEqualsKey( Object value, Long id ) throws Exception
+    {
+        ByteBuffer bb = new ByteBuffer();
+
+        bb.append( '=' );
+
+        byte[] normValue = getNormalized( value );
+        bb.append( normValue );
+
+        bb.append( Bytes.toBytes( String.valueOf( Base64.encode( Bytes.toBytes( id ) ) ) ) );
+
+        return bb.copyOfUsedBytes();
+    }
+
+
+    public byte[] getScanKey( Object value, byte[] entryId ) throws Exception
+    {
+        ByteBuffer bb = new ByteBuffer();
+
+        bb.append( '=' );
+
+        // add value
+        // there are special values to support attribute scan
+        if ( value == FULL_SCAN_START )
+        {
+            bb.append( FULL_SCAN_START );
+        }
+        else if ( value == FULL_SCAN_STOP )
+        {
+            bb.append( FULL_SCAN_STOP );
+        }
+        else if ( value != null )
+        {
+            byte[] normValue = getNormalized( value );
+            bb.append( normValue );
+        }
+
+        // add entryId
+        if ( entryId == VALUE_SCAN_START )
+        {
+            bb.append( VALUE_SCAN_START );
+        }
+        else if ( entryId == VALUE_SCAN_STOP )
+        {
+            bb.append( VALUE_SCAN_STOP );
+        }
+        else if ( entryId != null )
+        {
+            bb.append( Bytes.toBytes( String.valueOf( Base64.encode( entryId ) ) ) );
+        }
+
+        return bb.copyOfUsedBytes();
+    }
+
+
+    public Object getValueFromEqualsKey( byte[] row ) throws Exception
+    {
+        byte[] value = Bytes.tail( row, row.length - 1 );
+        value = Bytes.head( value, value.length - 12 );
+        AttributeType at = schemaManager.getAttributeTypeRegistry().lookup( attributeTypeOid );
+        return at.getSyntax().isHumanReadable() ? Bytes.toString( value ) : value;
+
+    }
+
+
+    /* (non-Javadoc)
+     * @see org.apache.directory.server.core.partition.hbase.table.HBaseIndexTable#count(java.lang.Object)
+     */
+    public int count( Object value ) throws Exception
+    {
+        if ( countCache.contains( value ) )
+        {
+            return countCache.get( value ).intValue();
+        }
+
+        byte[] row = getCountKey( value );
+        Long count = HBaseTableHelper.getLongValue( getIndexTablePool(), indexTableName, row, INFO_FAMILY,
+            COUNT_QUALIFIER, 0L );
+        countCache.put( value, count );
+        return count.intValue();
+    }
+
+
+    public boolean exists( Object value, Long id ) throws Exception
+    {
+        String key = value + ":" + id;
+        if ( existsCache.contains( key ) )
+        {
+            return existsCache.get( key );
+        }
+
+        byte[] row = getEqualsKey( value, id );
+        Get get = new Get( row );
+        boolean exists = HBaseTableHelper.exists( getIndexTablePool(), indexTableName, get );
+        existsCache.put( key, exists );
+        return exists;
+    }
+
+
+    protected void add( byte[] value, Long id ) throws Exception
+    {
+        // exact match (attribute=value): =value<id> -> id, value
+        // check first if the index already exists because we won't increment the index count
+        byte[] exactRow = getEqualsKey( value, id );
+        Get exactGet = new Get( exactRow );
+        if ( !HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) )
+        {
+            // get+put+put is not atomic!
+            Put exactPut = new Put( exactRow );
+            exactPut.setWriteToWAL( false );
+            exactPut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) );
+            HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut );
+
+            // increment exact match count: #value -> count
+            byte[] exactCountRow = getCountKey( value );
+            HBaseTableHelper.increment( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY,
+                COUNT_QUALIFIER );
+        }
+    }
+
+
+    protected void delete( byte[] value, Long id ) throws Exception
+    {
+        // exact match (attribute=value): =value<id> -> id, value
+        // check first if the index exists because we won't decrement the index count otherwise
+        byte[] exactRow = getEqualsKey( value, id );
+        Get exactGet = new Get( exactRow );
+        if ( HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) )
+        {
+            Delete exactDel = new Delete( exactRow );
+            HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel );
+
+            // decrement exact match count: #value -> count
+            byte[] exactCountRow = getCountKey( value );
+            HBaseTableHelper.decrement( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY,
+                COUNT_QUALIFIER );
+        }
+    }
+
+}

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseTableHelper.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseTableHelper.java?rev=902620&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseTableHelper.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseTableHelper.java Sun Jan 24 19:04:37 2010
@@ -0,0 +1,409 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.table;
+
+
+import org.apache.directory.server.core.partition.hbase.Utils;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HBaseTableHelper
+{
+
+    private static final Logger LOG = LoggerFactory.getLogger( HBaseTableHelper.class );
+
+    public static long RPC_COUNT = 0;
+
+
+    /**
+     * Decrements a value.
+     * 
+     * @param pool the table pool
+     * @param tableName the table name
+     * @param row the row
+     * @param family the family
+     * @param qualifier the qualifier
+     * 
+     * @return the byte[] the decremented value
+     * 
+     * @throws Exception the exception
+     */
+    public static byte[] decrement( HTablePool pool, String tableName, byte[] row, byte[] family, byte[] qualifier )
+        throws Exception
+    {
+        byte[] decrement = increment( pool, tableName, row, family, qualifier, -1L );
+        if ( LOG.isDebugEnabled() )
+        {
+            LOG.debug( "Decrement " + tableName + ":" + Utils.getPrintableString( row ) + ":"
+                + Utils.getPrintableString( family ) + ":" + Utils.getPrintableString( qualifier ) + "="
+                + Utils.getPrintableString( decrement ) );
+            logStack( 3 );
+        }
+        return decrement;
+    }
+
+
+    /**
+     * Increments a value.
+     * 
+     * @param pool the table pool
+     * @param tableName the table name
+     * @param row the row
+     * @param family the family
+     * @param qualifier the qualifier
+     * 
+     * @return the byte[] the incremented value
+     * 
+     * @throws Exception the exception
+     */
+    public static byte[] increment( HTablePool pool, String tableName, byte[] row, byte[] family, byte[] qualifier )
+        throws Exception
+    {
+        byte[] increment = increment( pool, tableName, row, family, qualifier, 1L );
+        if ( LOG.isDebugEnabled() )
+        {
+            LOG.debug( "Increment " + tableName + ":" + Utils.getPrintableString( row ) + ":"
+                + Utils.getPrintableString( family ) + ":" + Utils.getPrintableString( qualifier ) + "="
+                + Utils.getPrintableString( increment ) );
+            logStack( 3 );
+        }
+        return increment;
+    }
+
+
+    private static byte[] increment( HTablePool pool, String tableName, byte[] row, byte[] family, byte[] qualifier,
+        long amount ) throws Exception
+    {
+        HTable table = pool.getTable( tableName );
+        try
+        {
+            long id = table.incrementColumnValue( row, family, qualifier, amount, false );
+            RPC_COUNT++;
+            return Bytes.toBytes( id );
+        }
+        finally
+        {
+            pool.putTable( table );
+        }
+    }
+
+
+    /**
+     * Gets the long value of a column, the default value if the column doesn't exist.
+     * 
+     * @param pool the table pool
+     * @param tableName the table name
+     * @param row the row
+     * @param family the family
+     * @param qualifier the qualifier
+     * @param defaultValue the default value if the column doesn't exist
+     * 
+     * @return the long value
+     * 
+     * @throws Exception the exception
+     */
+    public static Long getLongValue( HTablePool pool, String tableName, byte[] row, byte[] family, byte[] qualifier,
+        Long defaultValue ) throws Exception
+    {
+        HTable table = pool.getTable( tableName );
+        try
+        {
+            Get get = new Get( row );
+            get.addColumn( family, qualifier );
+            Result result = table.get( get );
+            RPC_COUNT++;
+            byte[] value = result.getValue( family, qualifier );
+            Long l;
+            if ( value == null )
+            {
+                l = defaultValue;
+            }
+            else
+            {
+                l = Bytes.toLong( value );
+            }
+
+            if ( LOG.isDebugEnabled() )
+            {
+                LOG.debug( "Get Long from " + tableName + ":" + Utils.getPrintableString( row ) + ":"
+                    + Utils.getPrintableString( family ) + ":" + Utils.getPrintableString( qualifier ) + " -> " + l );
+                logStack( 2 );
+            }
+            return l;
+        }
+        finally
+        {
+            pool.putTable( table );
+        }
+    }
+
+
+    /**
+     * Commits a {@link Put} to the table.
+     * 
+     * @param pool the table pool
+     * @param tableName the table name
+     * @param put the put
+     * 
+     * @throws Exception the exception
+     */
+    public static void put( HTablePool pool, String tableName, Put put ) throws Exception
+    {
+        if ( LOG.isDebugEnabled() )
+        {
+            String row = Utils.getPrintableString( put.getRow() );
+            int size = put.size();
+            LOG.debug( "Put to " + tableName + ": row=" + row + ", nbKV=" + size );
+            logStack( 2 );
+        }
+
+        HTable table = pool.getTable( tableName );
+        try
+        {
+            table.put( put );
+            RPC_COUNT++;
+        }
+        finally
+        {
+            pool.putTable( table );
+        }
+    }
+
+
+    /**
+     * Commits a {@link Delete} to the table.
+     * 
+     * @param pool the table pool
+     * @param tableName the table name
+     * @param delete the delete
+     * 
+     * @throws Exception the exception
+     */
+    public static void delete( HTablePool pool, String tableName, Delete delete ) throws Exception
+    {
+        if ( LOG.isDebugEnabled() )
+        {
+            String row = Utils.getPrintableString( delete.getRow() );
+            if ( delete.isEmpty() )
+            {
+                LOG.debug( "Delete from " + tableName + ": row=" + row );
+                logStack( 2 );
+            }
+            else
+            {
+                int size = delete.getFamilyMap().size();
+                LOG.debug( "Delete from " + tableName + ": row=" + row + ", nbKV=" + size );
+                logStack( 2 );
+            }
+        }
+
+        HTable table = pool.getTable( tableName );
+        try
+        {
+            table.delete( delete );
+            RPC_COUNT++;
+        }
+        finally
+        {
+            pool.putTable( table );
+        }
+    }
+
+
+    /**
+     * Gets data from a table row.
+     * 
+     * @param pool the table pool
+     * @param tableName the table name
+     * @param get the get
+     * 
+     * @return the result
+     * 
+     * @throws Exception the exception
+     */
+    public static Result get( HTablePool pool, String tableName, Get get ) throws Exception
+    {
+        if ( LOG.isDebugEnabled() )
+        {
+            String row = Utils.getPrintableString( get.getRow() );
+            int size = get.getFamilyMap().size();
+            LOG.debug( "Get from " + tableName + ": row=" + row + ", nbFamilies=" + size );
+            logStack( 2 );
+        }
+
+        HTable table = pool.getTable( tableName );
+        try
+        {
+            Result result = table.get( get );
+            RPC_COUNT++;
+            return result;
+        }
+        finally
+        {
+            pool.putTable( table );
+        }
+    }
+
+
+    /**
+     * Checks if the data exist.
+     * 
+     * @param pool the table pool
+     * @param tableName the table name
+     * @param get the get
+     * 
+     * @return true, if the data exist
+     * 
+     * @throws Exception the exception
+     */
+    public static boolean exists( HTablePool pool, String tableName, Get get ) throws Exception
+    {
+        if ( LOG.isDebugEnabled() )
+        {
+            String row = Utils.getPrintableString( get.getRow() );
+            LOG.debug( "Exists from " + tableName + ": row=" + row );
+            logStack( 2 );
+        }
+
+        HTable table = pool.getTable( tableName );
+        try
+        {
+            boolean exists = table.exists( get );
+            RPC_COUNT++;
+            return exists;
+        }
+        finally
+        {
+            pool.putTable( table );
+        }
+    }
+
+
+    /**
+     * Gets a scanner.
+     * 
+     * @param pool the table pool
+     * @param tableName the table name
+     * @param scan the scan
+     * 
+     * @return the scanner
+     * 
+     * @throws Exception the exception
+     */
+    public static ResultScanner getScanner( HTablePool pool, String tableName, Scan scan ) throws Exception
+    {
+        if ( LOG.isDebugEnabled() )
+        {
+            String startRow = Utils.getPrintableString( scan.getStartRow() );
+            String stopRow = Utils.getPrintableString( scan.getStopRow() );
+            String filter = scan.getFilter() == null ? "" : scan.getFilter().getClass().getSimpleName();
+            LOG.debug( "Get Scanner from " + tableName + ": start=" + startRow + ", stop=" + stopRow + ", filter="
+                + filter );
+            logStack( 2 );
+        }
+
+        HTable table = pool.getTable( tableName );
+        try
+        {
+            ResultScanner scanner = table.getScanner( scan );
+            RPC_COUNT++;
+            return scanner;
+        }
+        finally
+        {
+            pool.putTable( table );
+        }
+    }
+
+
+    /**
+     * Creates the table.
+     * 
+     * @param configuration the configuration
+     * @param tableName the table name
+     * @param families the families
+     * 
+     * @throws Exception the exception
+     */
+    public static void createTable( HBaseConfiguration configuration, String tableName, byte[]... families )
+        throws Exception
+    {
+        // create table on the fly if it doesn't exist yet
+        HBaseAdmin admin = new HBaseAdmin( configuration );
+        if ( !admin.tableExists( tableName ) )
+        {
+            HTableDescriptor descriptor = new HTableDescriptor( tableName );
+            for ( byte[] family : families )
+            {
+                HColumnDescriptor columnDescriptor = new HColumnDescriptor( family );
+                columnDescriptor.setMaxVersions( 1 );
+                //columnDescriptor.setInMemory( true );
+                //columnDescriptor.setCompressionType( Algorithm.LZO );
+                descriptor.addFamily( columnDescriptor );
+
+            }
+            admin.createTable( descriptor );
+        }
+    }
+
+
+    /**
+     * Gets the configuration.
+     * 
+     * @return the configuration
+     */
+    public static HBaseConfiguration getHBaseConfiguration()
+    {
+        return new HBaseConfiguration();
+    }
+
+
+    private static void logStack( int start )
+    {
+        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+        for ( int i = start + 1; i < stackTrace.length; i++ )
+        {
+            String className = stackTrace[i].getClassName();
+            String packageName = className.substring( 0, className.lastIndexOf( '.' ) );
+            if ( !packageName.startsWith( "org.apache.directory.server.core.partition.hbase" ) )
+            {
+                break;
+            }
+            className = className.substring( className.lastIndexOf( '.' ) + 1 );
+            String methodName = stackTrace[i].getMethodName();
+            LOG.debug( "  called from " + className + "." + methodName );
+        }
+    }
+
+}

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/xdbmext/ExtendedCursorBuilder.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/xdbmext/ExtendedCursorBuilder.java?rev=902620&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/xdbmext/ExtendedCursorBuilder.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/xdbmext/ExtendedCursorBuilder.java Sun Jan 24 19:04:37 2010
@@ -0,0 +1,101 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.directory.server.core.partition.hbase.xdbmext;
+
+
+import org.apache.directory.server.core.entry.ServerEntry;
+import org.apache.directory.server.xdbm.IndexCursor;
+import org.apache.directory.server.xdbm.Store;
+import org.apache.directory.server.xdbm.search.impl.CursorBuilder;
+import org.apache.directory.server.xdbm.search.impl.OneLevelScopeEvaluator;
+import org.apache.directory.server.xdbm.search.impl.SubtreeScopeEvaluator;
+import org.apache.directory.shared.ldap.filter.AndNode;
+import org.apache.directory.shared.ldap.filter.AssertionType;
+import org.apache.directory.shared.ldap.filter.ExprNode;
+import org.apache.directory.shared.ldap.filter.ScopeNode;
+import org.apache.directory.shared.ldap.filter.SearchScope;
+
+
+/**
+ * Builds Cursors over candidates that satisfy a filter expression.
+ * 
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev: 659774 $
+ */
+public class ExtendedCursorBuilder extends CursorBuilder
+{
+    /** The database used by this builder */
+    private Store<ServerEntry> db = null;
+
+    /** Evaluator dependency on a EvaluatorBuilder */
+    private ExtendedEvaluatorBuilder evaluatorBuilder;
+
+
+    /**
+     * Creates an expression tree enumerator.
+     *
+     * @param db database used by this enumerator
+     * @param evaluatorBuilder the evaluator builder
+     */
+    public ExtendedCursorBuilder( Store<ServerEntry> db, ExtendedEvaluatorBuilder evaluatorBuilder )
+    {
+        super( db, evaluatorBuilder );
+        this.db = db;
+        this.evaluatorBuilder = evaluatorBuilder;
+    }
+
+
+    public IndexCursor<?, ServerEntry> build( ExprNode node ) throws Exception
+    {
+        // set the "real" filter node as annotation to the scope node
+        if ( node.getAssertionType() == AssertionType.AND )
+        {
+            AndNode andNode = ( AndNode ) node;
+            if ( andNode.getChildren().size() == 2 && andNode.getFirstChild().getAssertionType() == AssertionType.SCOPE )
+            {
+                andNode.getFirstChild().set( "filter", andNode.getChildren().get( 1 ) );
+            }
+        }
+
+        switch ( node.getAssertionType() )
+        {
+            case SUBSTRING:
+                return new ExtendedSubstringCursor( db, ( ExtendedSubstringEvaluator ) evaluatorBuilder.build( node ) );
+
+            case SCOPE:
+                ExprNode filter = ( ExprNode ) node.get( "filter" );
+
+                if ( ( ( ScopeNode ) node ).getScope() == SearchScope.ONELEVEL )
+                {
+                    return new ExtendedOneLevelScopeCursor( db, filter, ( OneLevelScopeEvaluator ) evaluatorBuilder
+                        .build( node ) );
+                }
+                else
+                {
+                    return new ExtendedSubtreeScopeCursor( db, filter, ( SubtreeScopeEvaluator ) evaluatorBuilder
+                        .build( node ) );
+                }
+
+            default:
+                return super.build( node );
+        }
+    }
+
+}

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/xdbmext/ExtendedEvaluatorBuilder.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/xdbmext/ExtendedEvaluatorBuilder.java?rev=902620&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/xdbmext/ExtendedEvaluatorBuilder.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/xdbmext/ExtendedEvaluatorBuilder.java Sun Jan 24 19:04:37 2010
@@ -0,0 +1,73 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.directory.server.core.partition.hbase.xdbmext;
+
+
+import org.apache.directory.server.core.entry.ServerEntry;
+import org.apache.directory.server.xdbm.Store;
+import org.apache.directory.server.xdbm.search.Evaluator;
+import org.apache.directory.server.xdbm.search.impl.EvaluatorBuilder;
+import org.apache.directory.shared.ldap.filter.AssertionType;
+import org.apache.directory.shared.ldap.filter.ExprNode;
+import org.apache.directory.shared.ldap.filter.SubstringNode;
+import org.apache.directory.shared.ldap.schema.SchemaManager;
+
+
+/**
+ * Top level filter expression evaluator builder implemenation.
+ * 
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev: 659774 $
+ */
+public class ExtendedEvaluatorBuilder extends EvaluatorBuilder
+{
+    private final Store<ServerEntry> db;
+    private final SchemaManager schemaManager;
+
+
+    /**
+     * Creates a top level Evaluator where leaves are delegated to a leaf node
+     * evaluator which will be created.
+     *
+     * @param db the database this evaluator operates upon
+     * @param registries the schema registries
+     * @throws Exception failure to access db or lookup schema in registries
+     */
+    public ExtendedEvaluatorBuilder( Store<ServerEntry> db, SchemaManager schemaManager ) throws Exception
+    {
+        super( db, schemaManager );
+        this.db = db;
+        this.schemaManager = schemaManager;
+    }
+
+
+    public Evaluator<? extends ExprNode, ServerEntry> build( ExprNode node ) throws Exception
+    {
+        if ( node.getAssertionType() == AssertionType.SUBSTRING )
+        {
+            return new ExtendedSubstringEvaluator( ( SubstringNode ) node, db, schemaManager );
+        }
+        else
+        {
+            return super.build( node );
+        }
+    }
+
+}



Mime
View raw message